[MCP] Model Context protocolでSSEをやってみる

[MCP] Model Context protocolでSSEをやってみる

Clock Icon2024.12.13

Introduction

先日Model Context protocol(以下 MCP)が発表され、弊社ブログでもいくつか記事がでています。
この標準プロトコルを使うことで、任意のリソースと AI システムとを簡単に接続できます。

MCP にはここにあるように、2 つの標準トランスポート実装が含まれています。

​1 つは標準入出力 (StdioServerTransport)で、
標準の入力および出力ストリームの通信を可能にします。
これは、ローカルで MCP server を起動して通信する場合に使います。

//MCP Serverコードサンプル
const transport = new StdioClientTransport({
  command: "npx",
  args: ["tsx", "path/your/mcp-server.ts"],
});

const client = new Client(
  {
    name: "simple-stdio-client",
    version: "1.0.0",
  },
  {
    capabilities: {},
  }
);

StdioClientTransportでは、クライアントはMCP Serverをサブプロセスとして起動します。
サーバーは標準入力でJSON-RPCを受信して標準出力でレスポンスを返します。

StdioClientTransportを使ったコードについては公式ドキュメントや弊社 blogを見ていただくとして、
ここではSSEServerTransportをつかってみます。

Environment

  • MacBook Pro (14-inch, M3, 2023)
  • OS : MacOS 14.5
  • node : v18.20.x

StdioServerTransport

先ほども説明しましたが、StdioServerTransport は標準入出力(stdin/stdout)を使用して
JSON-RPC メッセージを送受信するトランスポートの仕組みです。
参照

StdioServerTransport の場合、クライアントは MCP サーバーをサブプロセスとして起動し、
ローカルプロセス間の通信に使用されます。
そのため、必然的に単一のクライアントとの通信のみとなります。
公式でも CLI ツールやプラグインシステムなどに適しているとのこと。

SSEServerTransport

SSEServerTransportは、SSE(Server-Sent Events)を使ったHTTP通信を実現するためのトランスポートメカニズムです。
サーバーは複数のクライアント接続を処理できる独立したプロセスとして動作します。

SSEServerTransportを使うことで、クラサバ間の通信にHTTP POSTを使用した、
サーバーtoクライアントのストリーミングを実現します。

特徴は以下。

  • HTTP/SSE(Server-Sent Events)を使用してネットワーク越しに通信する
  • リモートクライアントからの接続受付
  • 複数のクライアントを同時に処理可能
  • Webアプリやマイクロサービスなどのアーキテクチャに適している

Try

SSE(Server-Sent Events)をやってみる

ではSSEServerTransportを使ってMCP ServerとMCP Clientを実装してみます。

サーバー側の実装

SSEトランスポートでは、サーバーは複数のクライアント接続を処理できる独立したプロセスとして動作します。

ここにあるように、サーバーは以下2つのエンドポイントを実装しなければいけません。

  • 1.クライアントが接続を確立し、サーバーからメッセージを受信するためのSSEエンドポイント(/events)
  • 2.クライアントがサーバーにメッセージを送信するためのHTTP POSTエンドポイント(/messages)

下記のようにhttpモジュールをつかってエンドポイントを実装してます。
(別にexpressでもfastifyでもかまいません)

//sse-server.ts
・・・
    const httpServer = http.createServer(async (req, res) => {
      if (req.method === "GET" && req.url === "/events") {
        console.log("New SSE connection");
        
        // 新しい接続用のトランスポートを作成
        const transport = new SSEServerTransport("/messages", res);
        this.activeTransport = transport;
        await this.server.connect(transport);

        res.on("close", () => {
          console.log("SSE connection closed");
          if (this.activeTransport === transport) {
            this.activeTransport = null;
          }
        });

        this.startSending(transport);
        return;
      }

      if (req.method === "POST" && req.url?.startsWith("/messages")) {
        if (!this.activeTransport) {
          res.writeHead(400).end("No active transport");
          return;
        }
        //SSEServerTransportがメッセージ処理をして
        //JSON-RPCでクライアントへレスポンスを返す
        await this.activeTransport.handlePostMessage(req, res);
        return;
      }

      res.writeHead(404).end();
    });
・・・

クライアントが接続すると、サーバーはクライアントがメッセージを送信するために使用するURIを含んだイベントを送信します。
その後、クライアントは、受け取ったエンドポイントへのHTTP POSTリクエストを送信できます。

サーバ実装、sse-server.tsは下記。
接続すると1秒毎にクライアントにメッセージを送ります。

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import http from "http";

class SSEServer {
  private server: Server;
  private activeTransport: SSEServerTransport | null = null;

  constructor() {
    this.server = new Server(
      {
        name: "sse-server",
        version: "1.0.0",
      },
      {
        capabilities: {}
      }
    );
  }

  async start(port: number = 3000) {
    const httpServer = http.createServer(async (req, res) => {
      if (req.method === "GET" && req.url === "/events") {
        console.log("New SSE connection");
        
        // 新しい接続用のトランスポートを作成
        const transport = new SSEServerTransport("/messages", res);
        this.activeTransport = transport; 
        await this.server.connect(transport);

        res.on("close", () => {
          console.log("SSE connection closed");
          if (this.activeTransport === transport) {
            this.activeTransport = null;
          }
        });

        this.startSending(transport);
        return;
      }

      if (req.method === "POST" && req.url?.startsWith("/messages")) {
        if (!this.activeTransport) {
          res.writeHead(400).end("No active transport");
          return;
        }
        //POSTリクエストを処理
        await this.activeTransport.handlePostMessage(req, res);
        return;
      }

      res.writeHead(404).end();
    });

    httpServer.listen(port, () => {
      console.log(`SSE Server running at http://localhost:${port}`);
      console.log(`SSE endpoint: http://localhost:${port}/events`);
    });
  }

  private async startSending(transport: SSEServerTransport) {
    try {
      await transport.send({
        jsonrpc: "2.0",
        method: "sse/connection",
        params: { message: "SSE Connection established" }
      });

      let messageCount = 0;
      const interval = setInterval(async () => {
        messageCount++;
        const message = `Message ${messageCount} at ${new Date().toISOString()}`;

        try {
          await transport.send({
            jsonrpc: "2.0",
            method: "sse/message",
            params: { data: message }
          });

          console.log(`Sent: ${message}`);

          if (messageCount === 10) {
            clearInterval(interval);
            await transport.send({
              jsonrpc: "2.0",
              method: "sse/complete",
              params: { message: "Stream completed" }
            });
            console.log("Stream completed");
          }
        } catch (error) {
          console.error("Error sending message:", error);
          clearInterval(interval);
        }
      }, 1000);

    } catch (error) {
      console.error("Error in startSending:", error);
    }
  }
}

new SSEServer().start().catch(console.error);

クライアント側の実装

続いてクライアント側の実装です。
クライアントでは、EventSourceを使ってSSEを実装しています。

import { EventSource } from 'eventsource';
(global as any).EventSource = EventSource;
    const url = new URL("/events", serverUrl);
    this.transport = new SSEClientTransport(url);
    switch (message.method) {
      case "sse/connection":
        console.log("Connection established:", message.params.message);
        break;
      
      case "sse/message":
        const timestamp = new Date().toISOString();
        console.log(`[${timestamp}] Received:`, message.params.data);
        break;
      
      case "sse/complete":
        console.log("\nStream completed:", message.params.message);
        this.isCompleted = true;
        break;
    }

クライアントのコードは下記。

import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
import { URL } from 'url';
import { EventSource } from 'eventsource';

(global as any).EventSource = EventSource;

class SSEClient {
  private client: Client;
  private transport: SSEClientTransport;
  private isCompleted: boolean = false;

  constructor(serverUrl: string = "http://localhost:3000") {
    const url = new URL("/events", serverUrl);
    this.transport = new SSEClientTransport(url);
    
    this.client = new Client(
      {
        name: "sse-client",
        version: "1.0.0",
      },
      {
        capabilities: {},
      }
    );
  }

  async connect() {
    try {
      await this.client.connect(this.transport);
      console.log("Connected to server");

      // メッセージハンドラーを設定
      this.transport.onmessage = (message: any) => {
        if (!message || typeof message.method !== 'string') {
          return;
        }

        switch (message.method) {
          case "sse/connection":
            console.log("Connection established:", message.params.message);
            break;
          
          case "sse/message":
            const timestamp = new Date().toISOString();
            console.log(`[${timestamp}] Received:`, message.params.data);
            break;
          
          case "sse/complete":
            console.log("\nStream completed:", message.params.message);
            this.isCompleted = true;
            break;
        }
      };

    } catch (error) {
      console.error("Connection error:", error);
      throw error;
    }
  }

  async waitForCompletion() {
    while (!this.isCompleted) {
      await new Promise(resolve => setTimeout(resolve, 100));
    }
  }

  async disconnect() {
    await this.client.close();
    console.log("Disconnected from server");
  }
}

async function main() {
  const serverUrl = process.env.SERVER_URL || "http://localhost:3000";
  console.log("Connecting to SSE server at:", serverUrl);
  
  const client = new SSEClient(serverUrl);
  try {
    await client.connect();
    await client.waitForCompletion();
  } catch (error) {
    console.error("Error:", error);
  } finally {
    await client.disconnect();
  }
}

if (import.meta.url.endsWith("sse-client.ts")) {
  main().catch(console.error);
}

実行

では実行してみます。
サーバの起動してからクライアントを起動してください。
複数のクライアントを同時に起動しても動作します。

下記のようなログになると思います。

% npx tsx sse-server.ts

SSE Server running at http://localhost:3000
SSE endpoint: http://localhost:3000/events
New SSE connection
Sent: Message 1 at 2024-12-13T13:39:15.347Z
・・・
Sent: Message 10 at 2024-12-13T13:39:24.356Z
Stream completed
SSE connection closed

クライアント側

% npx tsx sse-client.ts

Connecting to SSE server at: http://localhost:3000
Connected to server
[2024-12-13T13:39:15.348Z] Received: Message 1 at 2024-12-13T13:39:15.347Z
・・・
[2024-12-13T13:39:24.358Z] Received: Message 10 at 2024-12-13T13:39:24.356Z

Stream completed: Stream completed
Disconnected from server

なお、ngrokでサーバを外部に公開し、
割り当てられたURLをSERVER_URL環境変数に設定すれば、外部からもアクセス可能なのが確認できます。

% SERVER_URL=https://xxxxxx.ngrok-free.app npx tsx sse-client.ts

Summary

今回はSSEServerTransport/SSEClientTransportを使ってSSEを実装してみました。
複数接続もできますし、MCP + SSEでいろいろ使い道がありそうです。

References

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.